1. Data Engineering Cheat Sheet

2. Documentation Links

3. Tutorials

4. Cheat Sheets

Pyspark Structured Streaming Triggers

https://medium.com/@kiranvutukuri/trigger-modes-in-apache-spark-structured-streaming-part-6-91107a69de39

Minimal test code

df = spark.readStream.format("kafka")\
    .option("kafka.bootstrap.servers", "localhost:9092")\
    .option("subscribe", "espresso-machine-events")\
    .option("startingOffsets", "earliest")\
    .load()
    .option("maxOffsetsPerTrigger", 5)\
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")

# Define the streaming query and output to the console
query = df.writeStream\
    .outputMode("append")\
    .format("console")\
    .trigger(continuous="1 second")\
    .trigger(once=True)\
    .trigger(availableNow=True)\
    .trigger(processingTime="2 seconds")\
    .option("checkpointLocation", str(data_path / "tmp/checkpoints/processingTime"))\
    .start()

Messages created with 1 sample/s

Trigger Mode Checkpoint maxOffsetsPerTrigger Output
trigger(once=True) No Ignored Entire topic content in Batch 0
trigger(once=True) Yes Ignored Batch 0: Entire topic at runtime.
Batch 1: created after Batch 0.
trigger(availableNow=True) N/A 5 5 elements per batch. Terminates with the last available offset at the time of starting the query
trigger(processingTime="5 seconds") N/A 2 Triggers a batch every 5 seconds. Each batch contains only 2 elements.
trigger(processingTime="5 seconds") N/A 10 Triggers a batch every 5 seconds. Each batch all new available elements, since 10 maxOffsetsPerTrigger accomodate all new messages generated since the last batch.
trigger(continuous="1 second") Created automatically in /tmp if not provided Ignored Running continously to process each message with low latency. Decides on its own how many messages to put into each batch

4.1. Databricks Python API vs SQL

Data Engineering Task Python API Example SQL Example
Load Data df = spark.read.csv("data.csv", header=True) CREATE OR REPLACE TEMP VIEW data AS SELECT * FROM csv. OPTIONS ('header' 'true')
Data Cataloging spark.catalog.listTables("my_database") SHOW TABLES IN my_database
Data Lineage Tracking spark.sql("DESCRIBE HISTORY my_table") DESCRIBE HISTORY my_table
Streaming Data Processing df = spark.readStream.format("kafka").option("subscribe", "topic1").load() CREATE STREAMING TABLE data_stream AS SELECT * FROM kafka.\topic1``

4.2. Pyspark Dataframe API vs SQL

To run SQL

    df.createOrReplaceTempView("data_view")
    result = spark.sql("SELECT col1 FROM data_view")
Use Case Python API SQL
Load Data df = spark.read.csv("data.csv", header=True) CREATE OR REPLACE TEMP VIEW data AS SELECT * FROM csv.\data.csv` OPTIONS ('header' 'true')`
Select Columns df.select("col1", "col2") SELECT col1, col2 FROM data
Filter Rows df.filter(df["col1"] > 10) SELECT * FROM data WHERE col1 > 10
Add Column df.withColumn("new_col", df["col1"] * 2) SELECT *, col1 * 2 AS new_col FROM data
Group & Aggregate df.groupBy("col1").agg({"col2": "sum"}) SELECT col1, SUM(col2) AS total FROM data GROUP BY col1
Join Tables df1.join(df2, df1["key"] == df2["key"], "inner") SELECT * FROM df1 INNER JOIN df2 ON df1.key = df2.key
Sort Data df.orderBy(df["col1"].desc()) SELECT * FROM data ORDER BY col1 DESC
Write Data df.write.format("parquet").save("output.parquet") CREATE TABLE parquet.\output.parquet` AS SELECT * FROM data`

Approaches can be mixed via df.selectExpr()

df.selectExpr("col1", "col2 AS renamed_col").show()

5. Snippets

5.1. Multihop streaming from Kafka

raw_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", topic) \
    .option("startingOffsets", "earliest") \
    .load()


bronze_writer_query = raw_stream.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", checkpointLocation ) \
    .option("path", bronze_path) \
    .trigger(availableNow=True) \
    .start()